查看原文
其他

Kotlin Flow 操作符:篇幅很大 你忍一下

AndroidPub 2022-07-13

作者:搬砖小子出现了
原文:https://juejin.cn/post/6989536876096913439

Kotlin Flow 基本上可以替代 RxJava,其提供了诸多操作符来处理数据。本篇文章为其分一下类,演示下基本用法。

1名词解释

冷流

  • 无消费者则不生产数据

热流

  • 无消费者也会生产数据

2Flow 分类

标准Flow

  • 标准的 Flow 是冷流,当订阅后才产生数据流
  • 每个流只要一个订阅者
//构建
val testFlow = flow<String>{
    emit("hello")
    emit("flow")
}

//接收
coroutineScope.launch{
  testFlow.collect{ value->
     println(value)
  }
}

//打印
hello
flow

StateFlow

  • 有状态的 Flow ,可以有多个观察者,热流。
  • 构造时需要传入初始值 initialState
  • 常用作与 UI 相关的数据观察,可以替代 LiveData
//创建
val uiState=MutableStateFlow(Result.Loading)

//监听
coroutineScope.launch{
    uiState.collect{ value->
         println(value)
    }    
}

//赋值
uiState.value=Result.Sucess 

//打印结果
Result.Loading
Result.Sucess

SharedFlow

  • 可定制化的 StateFlow,可以有多个观察者,热流.
  • 无需初始值,有三个可选参数:
    • replay :重播给新订阅者的值的数量(默认为零)。
    • extraBufferCapacity :除了replay之外缓冲的值的数量
    • onBufferOverflow :配置缓冲区溢出策略,默认会挂起emit
//创建
val signEvent=MutableSharedFlow <String> ()

//监听
coroutineScope.launch{
    signEvent.collect{ value->
         println(value)
    }    
}
//赋值
signEvent.tryEmit("hello")
signEvent.tryEmit("shared flow")

//打印结果
hello
shared flow

3操作符分类

  • Flow的创建 : 创建 flow 的方法
  • 中间操作符 : 一般来说是用来执行一些操作,不会立即执行,返回值还是个 Flow
  • 末端操作符 :会触发流的执行,返回值不是Flow

4创建 Flow

flow { }

  • 创建 Flow 的基本方法
  • 使用 emit 发射单个值
  • 使用 emitAll 发射一个流 ,类似 list.addAll(anotherList)
flow<Int>{
    emit(1)
    emit(2)
    emit(flowOf(1,2,3))
}

flowOf

快速创建 flow ,类比 listOf()

val testFLow = flowOf(1,2,3)
launch{
    testFLow.collect{ value->
        print(value)
    }
}

//打印结果
1
2
3

asFlow

将其他数据转换成 普通的flow ,一般是集合向Flow的转换

listOf(1,2,3).asFlow()

callbackFlow

将回调方法改造成flow , 类似suspendCoroutine

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            send(value)
                .onFailure { throwable ->
                
                }

        }

        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }

        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    awaitClose { api.unregister(callback) }
}

emptyFlow

返回一个空流

emptyFlow<Int>()

channelFlow

在一般的flow在构造代码块中不允许切换线程,ChannelFlow 则允许内部切换线程

//构建

val channelFlow = channelFlow<String> {
 send("hello")
    withContext(Dispatchers.IO) {
     send("channel flow")
    }
 }

//监听

coroutineScope.launch{
    signEvent.collect{ value->
         println(value)
    }
}

5末端操作符

collect

触发flow的运行 。通常的监听方式

launch{
    flowOf(1,2,3).collect{ value->
        print(value)
    }
}

// 1 2 3

collectIndexed

带下标的收集操作

launch{
    flowOf(1,2,3).collectIndexed{ value->
        print(value)
    }
}
// 1 2 3

collectLatest

  • 与 collect的区别是,有新值发出时,若此时上个收集尚未完成,则会取消掉上个收集操作
  • 只想要最新的数据,中间值可丢弃时使用此方式
flow {
 emit(1)
    delay(50)
    emit(2)
 } .collectLatest { value ->

 println("Collecting $value")
    delay(100// Emulate work
    println("$value collected")

 }

//输出
Collecting 1
Collecting 2
2 collected

toCollection

将结果添加到集合

val array = arrayListOf(0)
launch {
  flow {
     emit(1)
     emit(2)
    } .toCollection(array)
 }

array.forEach { value->
  print(value)
 }

//打印结果 

0 1 2

toList

将结果转换为List

flow {
   emit(1)
   emit(2)
} .toList().forEach{value->
    print(value)
}
// 1 2

toSet

将结果转换为Set

flow {
   emit(1)
   emit(1)
} .toSet().forEach{value->
    print(value)
}
// 1

launchIn

  • 直接触发流的执行
  • 入参为 coroutineScope
  • 一般不会直接调用,会搭配别的操作符一起使用,如onEach, onCompletion等。
  • 返回值是Job
flow {
   emit(1)
   emit(2)
 }.launchIn ( lifecycleScope )

last

返回流中的最后一个值, 不允许为空

val myFlow= flow {
   emit(1)
   emit(2)
 }

launch{
    print(myFlow.last())
}

// 2

lastOrNull

返回流中的最后一个值, 可以为空

val myFlow= emptyFlow<Int>()
launch{
    print(myFlow.lastOrNull())
}

// null

first

返回流中的第一个值 ,如果为空会抛异常

val myFlow= flow {
   emit(1)
   emit(2)
 }
launch{
    print(myFlow.first())
}

// 1

firstOrNull

返回流的第一个值 ,可以为空

val myFlow= emptyFlow<Int>()
launch{
    print(myFlow.firstOrNull())
}
// null

single

  • 接收流中的第一个值 ,区别于first()
  • 如果为空或者发了不止一个值,则都会报错
val myFlow= flow {
     emit(1)
}

launch {
     print(myFlow.single()) // 1
}

val myFlow1= flow {
   emit(1)
   emit(2)
 }

launch {
   print(myFlow 1 . single ()) // error
}

singleOrNull

  • 接收流中的第一个值 ,可以为空
  • 但如果发了不止一个值,依旧会报错
val myFlow= flow {
 emit(1)
}

launch  {
 print(myFlow. singleOrNull ()) // 1
}

count

  • 返回流中发射的数据的个数。
  • 类似 list.size()
  • 不能在 sharedFlow 等热流中使用
val myFlow= flow {
emit(1)
emit(2)
}
launch{
print(myFlow.count())
}
//2

fold

从初始值开始执行遍历,将结果作为下个执行的参数

val sum= flowOf(234)
            .fold(1, { result, value ->
                 result + value
            })
// sum = 10, 相当于 1 + 2 + 3 + 4

reduce

fold 类似, 区别是无初始值

val result= flowOf(123)
                .reduce { acc, value ->
                     acc + value
                }
 //result = 6   1 + 2  +3 

6回调操作符

onStart

  • 在上游流开始之前被调用。
  • 可以发出额外元素, 也可以处理其他事情,比如发埋点
flow<Result>{a2111
   emit(Result.Success)
}.onStart{
   emit(Result.Loading)
}

onCompletion

  • 在流取消或者结束时调用。
  • 可以执行发送元素,发埋点等操作
flow<Result>{
   emit(Result.Success)
}.onCompletion{
   emit(Result.End)
}

onEach

在上游向下游发出元素之前调用

flow<Int>{
emit(1)
emit(2)
emit(3)
}.onEach{ value->
println(value)
}.launchIn(lifecycleScope)

// 打印结果
1
2
3

onEmpty

  • 当流完成却没有发出任何元素时回调
  • 可以用来兜底
emptyFlow<String>().onEmpty {
   emit("兜底数据")
 } .launchIn(lifecycleScope)

onSubscription

  • SharedFlow 专属操作符 (StateFlow是SharedFlow 的一种特殊实现), 在建立订阅之后回调。
  • onStart 有些区别 ,SharedFlow 是热流,如果在 onStart 里发送值,则下游可能接收不到。
val state = MutableSharedFlow<String>().onSubscription {
     emit("onSubscription")
 }

launch{
    state.collect { value->
        println(value)
    }
}


//打印结果

onSubscription

7变换操作符

map

  • 将发射的数据进行变化
  • lambda 的返回值为最终发送的值
flow {
    emit(1)
    emit(2)
 } .map { value ->
    value * 2
 } .collect {
    println(value)
}

//打印结果
2
4

mapLatest

类比 collectLatest ,当有新值发送时如果上个变换还没结束,会先取消掉

flow {
    emit("a")
    delay(100)
    emit("b")
}.mapLatest { value ->
    println("Started computing $value")
    delay(200)
    "Computed $value"
}.collect {value->
 print(value)
}

// 打印结果
Started computing a
Started computing b
Computed b

mapNotNull

仅发射 map 后不为空的值

flow {
    emit("a")
    emit("b")
 } .mapNotNull { value ->
  if (value != null) {
        value
    } else {
        null
 }
 }.collect { value ->
    print(value)
}
// 结果

b

transform

  • 对发射的值进行变换
  • 区别于maptransform的接收者是FlowCollector ,它非常灵活,可以变换、跳过它或多次发送。
flow {
    emit(1)
    emit(2)
 } .transform { value ->
  if (value == 1) {
        emit("value :$value*2")
    }
    emit("transform :$value")
 }.collect { value->
  println(value)
}

// 打印结果

value : 1*2
transform :1
transform :2

transformLatest

类比 mapLatest ,当有新值发送时如果上个变换还没结束,会先取消掉

flow {
  emit("a")
  delay(100)
  emit("b")
 }.transformLatest { value ->
  emit(value)
  delay(200)
  emit(value + "_last")
 }.collect {value->
  println(value)
}


// 打印结果
a
b
b_last

transformWhile

返回值是 Boolean , 如果为 False则不再进行后续变换, 为 True则继续执行

flow {
    emit("a")
    emit("b")
 } .transformWhile { value ->
    emit(value)
    true
 } .collect { value->
  println(value)
}


//结果
a
b

flow {
    emit("a")
    emit("b")
 }.transformWhile { value ->
    emit(value)
    false
 }.collect { value->
    println(value)
}

//结果
a

asStateFlow

  • MutableStateFlow 转换为 StateFlow ,就是变成不可变的。
  • 常用在对外暴露属性时使用
private val _uiState = MutableStateFlow<UIState>(Loading)

val uiState = _uiState.asStateFlow()

asSharedFlow

  • MutableSharedFlow 转换为 SharedFlow ,即变成不可变的
  • 常用在对外暴露属性时使用
private val _uiState = MutableStateFlow<UIState>(Loading)

val uiState = _uiState.asStateFlow()

receiveAsFlow

  • Channel 转换为 Flow
  • 可以有多个观察者,但不是多播,可能会轮流收到值。
private val _event = Channel<Event>()

val event= _event.receiveAsFlow() 

consumeAsFlow

将Cha`nnel 转换为Flow ,但不能多个观察者(会crash)!

private val _event = Channel<Event>()

val event= _event.consumeAsFlow () 

withIndex

将结果包装成IndexedValue 类型

flow {
    emit("a")
    emit("b")
 } .withIndex().collect {
  print(it.index + ": " + it.value)
}


//结果
0 : a
1 : b

scan

  • fold 相似,区别是 fold 返回的是最终结果
  • scan 返回的是个 flow ,会把初始值和每一步的操作结果发送出去
flowOf(123).scan(0) { acc, value ->
    acc + value 
 }.collect {
  print(it)
}

// 0 1 3 6
acc 是上一步操作的结果, value 是发射的值

0 是 初始值 
1 是 0 + 1 = 1
3 是 1 + 2 = 3
6 是 3 + 3 = 6

produceIn

  • 转换为 ReceiveChannel , 不常用。
  • 注意 Channel 内部有 ·ReceiveChannelSendChannel 之分
flowOf(123).produceIn(this)
               .consumeEach { value->
                    print(value)
               }
               
//1 2 3

runningFold

区别于 fold ,就是返回一个新流,将每步的结果发射出去。

flowOf(123).runningFold(1){ acc, value ->
     acc + value
 } .collect { value->
     print(value)
 }
 // 1 2 4 7

runningReduce

区别于 reduce ,就是返回一个新流,将每步的结果发射出去。

flowOf(123).runningReduce(1) { acc, value ->
     acc + value
 } .collect { value->
     print(value)
 }
 // 1 3 6

shareIn

  • 将普通flow 转化为 SharedFlow , 其有三个参数

    • scope:  CoroutineScope 开始共享的协程范围
    • started:  SharingStarted 控制何时开始和停止共享的策略
    • replay: Int = 0 发给 新的订阅者 的旧值数量
  • 其中 started 有一些可选项:

    • Eagerly : 共享立即开始,永不停止
    • Lazily : 当第一个订阅者出现时,永不停止
    • WhileSubscribed : 在第一个订阅者出现时开始共享,在最后一个订阅者消失时立即停止(默认情况下),永久保留重播缓存(默认情况下)
    • WhileSubscribed 具有以下可选参数:
    • stopTimeoutMillis :配置最后一个订阅者消失到协程停止共享之间的延迟(以毫秒为单位)。默认为零(立即停止)。
    • replayExpirationMillis :共享的协程从停止到重新激活,这期间缓存的时效
val share = flowOf(1,2,3).shareIn(this,SharingStarted.Eagerly)

//可以有多个观察者
state.collect{value->
  print(value)
}

stateIn

  • 将普通flow 转化为 StateFlow 。其有三个参数:
    • scope - 开始共享的协程范围
    • started - 控制何时开始和停止共享的策略
    • initialValue - 状态流的初始值
val  state = flowOf(Success).stateIn(lifecycleScope,SharingStarted.Eagerly,Loading)

state.collect{value->
  print(value)
}
// Loading  Success

stateInsharedIn 通常用在其他来源的flow的改造监听,不会像上面那样使用。


8过滤操作符

filter

筛选出符合条件的值

flow {
    emit("a")
    emit("b")
}.filter { value ->
    value == "a"
}.collect { value->
    print(value)
}


//结果
a

filterInstance

筛选对应类型的值


flow {
    emit("a")
    emit("b")
    emit(1)
 }.filterIsInstance<String>().collect { value->
    print(value)
 }

//结果

a
b

filterNot

筛选不符合条件相反的值,相当于filter取反

flow {
    emit("a")
    emit("b")
 }.filterNot { it == "a" } .collect { value ->
   print(value)
}



//结果
b

filterNotNull

筛选不为空的值

flow {
    emit("a")
    emit(null)
    emit("b")
 }.filterNotNull().collect { value->
  print(value)
}


//结果
a
b

drop

  • 入参countint类型
  • 作用是丢弃掉前 n 个的值
flow {
    emit(1)
    emit(2)
    emit(3)
 }.drop(2).collect { value ->
  print(value)
}


//结果
3

dropWhile

  • 这个操作符有点特别,和 filter 不同在于 它是找到第一个不满足条件的,返回其和其之后的值。
  • 如果首项就不满足条件,则是全部返回。
flow {
 emit(3)
 emit(1//从此项开始不满足条件
 emit(2)
 emit(4)
}. dropWhile { it == 3  } .collect { value ->
  print(value)
}


//结果 
1 2 4


flow {
 emit(1//从首项开始就不满足条件
 emit(2)
 emit(3)
 emit(4)
}. dropWhile { it == 3  } .collect { value ->
 print(value)
}

//结果

1 2 3 4

take

返回前 n个 元素



flow {
    emit(1)
    emit(2)
    emit(3)
 } .take(2) .collect { value ->
    print(value)
}

//结果
1
2

takeWhile

  • 找第一个不满足条件的项,但是取其之前的值 ,和dropWhile 相反。
  • 如果第一项就不满足,则为空流
flow {
    emit(1)
    emit(2)
    emit(3//从此项开始不满足条件
    emit(4)
 } .takeWhile { it <3  } .collect { value ->
    print(value)
}

//结果
1 2 


flow {
    emit(3)  //从此项开始不满足条件
    emit(1)
    emit(2)
    emit(4)
 } .takeWhile { it <3  } .onEmpty {
  print( "empty")
 }.collect { value ->
  print(value)
}


//结果
empty

debounce

  • 防抖节流 ,指定时间内的值只接收最新的一个,其他的过滤掉。
  • 适合搜索联想场景

flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000)

 // 3 4 5

sample

  • 采样
  • 给定一个时间周期,仅获取周期内最新发出的值
flow {
    repeat(10) {
        emit(it)
        delay(110)
    }
}.sample(200)


// 1 3 5 7 9

//图示

       【1

|-----------|

1          200  

               2    【3

           |------------|

          200           400

distinctUntilChangedBy

  • 去重操作符,判断连续的两个值是否重复,可以选择是否丢弃重复值。
  • keySelector: (T) -> Any? 指定用来判断是否需要比较的 key
  • 有点类似 Recyclerview 的 DiffUtil 机制。
flowOf(
    Funny(name = "Tom", age = 8),
    Funny(name = "Tom", age = 12),
    Funny(name = "Tom", age = 12)
).distinctUntilChangedBy { it.name } .collect { value ->
     print(value.toString())
}

// Funny(name=Tom, age=8)

distinctUntilChanged

  • 过滤用,distinctUntilChangedBy 的简化调用 。
  • 连续两个值一样,则跳过发送
flowOf(1, 1, 3,1).distinctUntilChanged()
.collect { value ->
print(value)
}

// 1 3 1

9组合操作符

combine

组合每个流最新发出的值。

val flow = flowOf(12).onEach { delay(10) }
val flow2 = flowOf("a""b""c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s } .collect {
  println(it) // Will print "1a 2a 2b 2c"
}

combineTransform

顾名思义 combine + transform

val numberFlow = flowOf(12).onEach { delay(10) }
val stringFlow = flowOf("a""b""c").onEach { delay(15) }

numberFlow.combineTransform(stringFlow) { number, string ->
     emit("$number :$string")
 }.collect { value ->
     println( value )
 }



//结果
1 :a
2 :a
2 :b
2 :c

merge

  • 合并多个流成 一个流
  • 可以用在多级缓存等场景
val numberFlow = flowOf(12).onEach { delay(10) }
val stringFlow = flowOf("a""b""c").onEach { delay(15) }

listOf(numberFlow,stringFlow).merge()
                             .collect { value ->
                                 print(value)
                             }


// 1 a 2 b c

flattenConcat

  • 以顺序方式将给定的流展开为单个流
  • Flow<Flow<T>>的扩展函数
flow {
    emit(flowOf(123))
    emit(flowOf(456))
 } .flattenConcat().collect { value->
     print(value)
 }

// 1 2 3 4 5 6

flattenMerge

  • 作用和 flattenConcat 一样,但是可以设置并发收集流的数量。
  • 入参 concurrency, 当其 == 1 时,效果和 flattenConcat 一样,大于 1 时,则是并发收集。
flow {
    emit(flowOf(123).flowOn(Dispatchers.IO))
    emit(flowOf(456).flowOn(Dispatchers.IO))
    emit(flowOf(789).flowOn(Dispatchers.IO))
 }.flattenMerge(3).collect { value->
     print(value)
 }


//1 2 3 7 8 9 4 5 6 (顺序并不固定)

flatMapContact

  • 这是一个组合操作符,相当于 map + flattenConcat
  • 通过 map 转成一个流,再通过 flattenConcat 展开合并成一个流
flowOf(123).flatMapConcat {
     flowOf(it.toString() + " map")
 } .collect { value ->
     print ln (value)
 }

// 1 map 
// 2 map
// 3 map

flatMapLatest

  • 和其他 带 Latest的操作符 一样,如果下个值来了,上变换还没结束,就取消掉。
  • 相当于 transformLatest + emitAll
flow {
     emit("a")
     delay(100)
     emit("b")
 }.flatMapLatest { value ->
     flow {
         emit(value)
         delay(200)
         emit(value + "_last")
     }
 }.collect { value ->
     print(value)
 }

 // a b b_last

flatMapMerge

  • 等价于 map + flattenMerge
  • 也有 concurrency  这样一个参数,来限制并发数
flowOf("a","b","c","d","e","f").flatMapMerge(3) { value ->
     flow {
         emit(value)
     } .flowOn(Dispatchers.IO)
 }.collect { value ->
     print(value)
 }

// b a c d e f

zip

  • 对两个流进行组合,分别从二者取值,
  • 一旦一个流结束了,那整个过程就结束了。
val flow = flowOf(123).onEach { delay(10) }

val flow2 = flowOf("a""b""c""d").onEach { delay(15) }

flow.zip(flow2) { i, s -> i.toString() + s }.collect {
    println(it) 
}

 // Will print "1a 2b 3c"

10功能性操作符

cancellable

  • 接收的的时候判断 协程是否被取消
  • 如果已取消,则抛出异常
val job= flowOf(1,3,5,7).cancellable().onEach { value->
     print(value)
 } .launchIn(lifecycleScope)
 
 //取消
 job.cancel()

catch

  • 对上游异常进行捕获 ,对下游无影响
  • 上游 指的是 此操作符之前的流
  • 下游 指的是此操作符之后的流
flow<Int> {
  throw IOException("")     
 } .catch { e ->
  if(e is IOException){
        //...
 }
}

retryWhen

  • 有条件的进行重试
  • lambda 中有两个参数, 一个是 异常原因,一个是当前重试的 index (从0开始)
  • lambda 的返回值 为 Boolean ,
    • true则继续重试
    • false 则结束重试
flow<Int> {
    print("doing")
    throw IOException("")
 } .retryWhen { cause,attempt->
     if(attempt > 4){
        return@retryWhen false
     }
    cause is IOException
 }

retry

  • 重试机制 ,当流发生异常时可以重新执行, retryWhen 的简化版。
  • 参数 retries: Long  指定重试次数,以及控制是否继续重试 (默认为true)
flow<Int> {
  throw IOException("")     
 }. retry (3){ e->
  if(e is IOException){
      true
  }else {
      false
  }
}


flow<Int> {
  throw IOException("")     
 }.retry(3)

buffer

  • 如果消费赶不上生产,可使用 buffer 创建缓冲
  • capacity: Int = BUFFERED 缓冲区的容量
  • onBufferOverflow: BufferOverflow : 溢出时的策略
    • SUSPEND 挂起,
    • DROP_OLDEST 丢掉旧的,
    • DROP_LATEST 丢掉新的
flowOf("A""B""C")
    .onEach  { println("1$it") }
    .collect { println("2$it") }

    
Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--


flowOf("A""B""C")
 .onEach  { println("1$it") }
 .buffer()  // <--------------- buffer between onEach and collect
 .collect { println("2$it") }


P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }

                      |
                      | channel               // buffer()
                      V

Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect

conflate

  • 仅保留最新值
  • 内部就是 buffer(CONFLATED)
flow {
      repeat(30) {
      delay(100)
      emit(it)
    }
 }.conflate().onEach { delay(1000) } .collect { value ->
     print(value)
 }

// 0 7 15 22 29  (结果不固定)

flowOn

  • 指定上游操作的执行线程 , 用来切换流的线程
flow.map { ... } // Will be executed in IO
 . flowOn (Dispatchers.IO) // This one takes precedence
 . collect{ ... }

11总结

以上就是 Kotlin Flow 所有操作符的基本用法,在实际场景中按需使用。

比如上面说的:

  • 搜索场景使用debounce防抖,
  • 网络请求使用retry,
  • 多级缓存使用merge
  • 组件通信使用SharedFlow,
  • 数据合并使用combine等
  • ...



~ FIN ~


推荐阅读
玩转 MotionLayout:实战效果展示
关于Java字节码,了解这些就够了
【Kotlin协程】Channel 与 Flow 深入解析
FragmentFactory 在 Koin 中的应用


加好友拉你进群,技术干货聊不停


↓关注公众号↓↓添加微信交流↓



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存